package com.bfxy.rocketmq.producer.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionListenerImpl implements TransactionListener {

    // 这里的两个方法分别实现了前面说的：这个对象主要做两件事情：1、异步的执行本地事物；2、做回查


    // 1、执行本地事物

    /**
     * @param message 回调过来的参数，TransactionListener中48行的message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("-----执行本地事物（异步的去执行）-----");
        String callArg = (String) o;
        System.out.println("callArg: " + callArg);
        System.out.println("message: " + message);
        //tx.begin

        //数据库的落库操作，

        //LocalTransactionState有三种状态如下
        //1、LocalTransactionState.COMMIT_MESSAGE;//这句话执行完之后，会发送一条确认消息，确认MQ是否收到消息
        //2、LocalTransactionState.ROLLBACK_MESSAGE;//回滚，MQ会删除该消息。
        //3、LocalTransactionState.UNKNOW;//中间状态，表示我们这个消息不可达，就是没有达到MQ集群

        //tx.commit
        return LocalTransactionState.UNKNOW;
    }

    // 2、当消息处于中间阶段时，MQ会不断的重试，重试会连接到producer然后会回调check函数。这个事情是在这里实现的
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("-----回调消息检查-----" + messageExt);
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
